---
title: Testing | Dagster
description: Dagster enables you to build testable and maintainable data applications.
---

# Testing

Dagster enables you to build testable and maintainable data applications. It provides ways to allow you unit-test your data applications, separate business logic from environments, and set explicit expectations on uncontrollable inputs.

## Relevant APIs

| Name                                    | Description                                                                                                           |
| --------------------------------------- | --------------------------------------------------------------------------------------------------------------------- |
| <PyObject object="execute_pipeline"  /> | A method to execute a pipeline synchronously, typically for testing pipeline execution or running standalone scripts. |
| <PyObject object="execute_solid" />     | A method to execute a single solid in an ephemeral pipeline, intended to support unit tests.                          |

## Overview

In data applications, testing computations and pipelines is notoriously challenging. Because of this, they often go relatively untested before hitting production. If there is testing in place, these tests are often slow, not run during common developer workflows, and have limited value because of the inability to simulate conditions in the production environment.

We believe the underlying fact is that data applications encode much of their business logic in heavy, external systems. Examples include processing systems like Spark and data warehouses such as Snowflake and Redshift. It is difficult to structure software to isolate these dependencies or nearly impossible to run them in a lightweight manner.

This page demonstrates how Dagster addresses these challenges:

- It provides convenient ways to write [Unit Tests in Data Applications](#unit-tests-in-data-applications).
- It allows you to [Separate Business Logic from Environments](#separating-business-logic-from-environments) and, therefore, write lightweight tests.

## Unit Tests in Data Applications

Principal: Errors that can be caught by unit tests should be caught by unit tests.

Corollary: Do not attempt to unit test for errors that unit tests cannot catch.

Using unit tests without keeping these principles in mind is why the data community frequently treats unit tests with skepticism. It is too often interpreted as simulating an external system such as Spark or data warehouse in a granular manner. Those are very complex systems that are impossible to emulate faithfully. Do not try to do so.

Unit tests are not acceptance tests. They should not be the judge of whether a computation is correct. However, unit testing -- when properly scoped -- is still valuable in data applications. There are massive classes of errors that we can address without interacting with external services and catch earlier in the process: refactoring errors, syntax errors in interpreted languages, configuration errors, graph structure errors, and so on. Errors caught in a fast feedback loop of unit testing can be addressed orders of magnitude faster than those caught during an expensive batch computation in staging or production.

So, unit tests should be viewed primarily as productivity and code quality tools, leading to more correct calculations. Here we will demonstrate how Dagster conveniently enables unit tests.

### Testing a Pipeline Execution

The workhouse function for unit-testing a pipeline is the <PyObject object="execute_pipeline" /> function. Using this function you can execute a pipeline in process and then test execution properties using the <PyObject object="PipelineExecutionResult" /> object that it returns.

```python file=/concepts/solids_pipelines/unit_tests.py startafter=start_test_pipeline_marker endbefore=end_test_pipeline_marker
def test_pipeline():
    result = execute_pipeline(do_math)

    # return type is PipelineExecutionResult
    assert isinstance(result, PipelineExecutionResult)
    assert result.success
    # inspect individual solid result
    assert result.output_for_solid("add_one") == 2
    assert result.output_for_solid("add_two") == 3
    assert result.output_for_solid("subtract") == -1
```

You can find more unit test examples in the [Examples](#examples) section below.

## Separating Business Logic from Environments

As noted above, data applications often rely on and encode their business logic in code that is executed by heavy, external dependencies. It means that it is easy and natural to couple your application to a single operating environment. However, then, if you do this, any testing requires your production environment.

To make local testing possible, you may structure your software to, as much as possible, cleanly separate this business logic from your operating environment. This is one of the reasons why Dagster flows through a context object throughout its entire computation.

Attached to the context is a set of user-defined [resources](/concepts/modes-resources). Examples of resources include APIs to data warehouses, Spark clusters, s3 sessions, or some other external dependency or service. We can then configure pipelines to be in different "modes", which can alter what version of the resource is vented to the user.

For example, in order to skip external dependencies in tests, you may find yourself needing to constantly comment and uncomment like:

```python file=/concepts/modes_resources/tests.py startafter=start_mode_test_before_marker endbefore=end_mode_test_before_marker
@solid
def get_data_without_resource(context):
    dummy_data = [1, 2, 3]
    # Do not call external apis in tests
    # return call_api()
    return dummy_data
```

### Testing the Same Pipeline in Different Modes

Dagster allows you to execute the same pipeline in different "modes". With modes and resources, you can modify the solid above to:

```python file=/concepts/modes_resources/tests.py startafter=start_mode_test_after_marker endbefore=end_mode_test_after_marker
@solid(required_resource_keys={"api"})
def get_data(context):
    return context.resources.api.call()


@solid
def do_something(context, data):
    output = process(data)
    return output


@pipeline(
    mode_defs=[
        ModeDefinition(name="unit_test", resource_defs={"api": ResourceDefinition.mock_resource()}),
        ModeDefinition(name="prod", resource_defs={"api": api_client}),
    ]
)
def download_pipeline():
    do_something(get_data())
```

In this example, the business logic (i.e., pipelines and solids) remains the same in two different environments (i.e., resources), while the `api` resource gets mocked in the test mode using a helper method [`mock_resource`](/\_apidocs/modes-resources#dagster.ResourceDefinition.mock_resource) from the <PyObject object="ResourceDefinition" /> class.

This is extremely helpful when it comes to testing, because to toggle between unit-testing the pipeline and running it in production, you can simply switch the mode name:

```python file=/concepts/modes_resources/tests.py startafter=start_execution_marker endbefore=end_execution_marker
def test_local():
    result = execute_pipeline(download_pipeline, mode="unit_test")
    assert result.success


def run_in_prod():
    execute_pipeline(download_pipeline, mode="prod")
```

For more information, you can check out the [Modes](/concepts/modes-resources) and [Resources](/concepts/modes-resources) sections.

## Examples

### Testing single solid execution

You can execute a single solid without writing a pipeline using <PyObject object="execute_solid" />. Using this function you will a single solid in an ephemeral pipeline. You can then then test execution properties using the <PyObject object="SolidExecutionResult" /> object that it returns.

```python file=/concepts/solids_pipelines/unit_tests.py startafter=start_test_solid_marker endbefore=end_test_solid_marker
def test_solid():
    result = execute_solid(add_one)

    # return type is SolidExecutionResult
    assert isinstance(result, SolidExecutionResult)
    assert result.success
    # check the solid output value
    assert result.output_value() == 2
```

### Testing pipeline execution with configs

Sometimes, you may want to test with different configuration. You can execute pipeline with a specified run config via the `run_config`:

```python file=/concepts/solids_pipelines/unit_tests.py startafter=start_test_pipeline_with_config endbefore=end_test_pipeline_with_config
def test_pipeline_with_config():
    result = execute_pipeline(
        do_math,
        run_config={
            "solids": {"add_one": {"inputs": {"num": 2}}, "add_two": {"inputs": {"num": 3}}}
        },
    )

    assert result.success

    assert result.output_for_solid("add_one") == 3
    assert result.output_for_solid("add_two") == 5
    assert result.output_for_solid("subtract") == -2
```

### Testing subset execution

You can execute only a subset of a pipeline via the `solid_selection` argument and test the result by checking `solid_result_list` which is a list of <PyObject object="SolidExecutionResult" />.

```python file=/concepts/solids_pipelines/unit_tests.py startafter=start_test_subset_execution endbefore=end_test_subset_execution
def test_subset_execution():
    result = execute_pipeline(
        do_math,
        solid_selection=["add_one", "add_two"],
    )

    assert result.success
    assert result.output_for_solid("add_one") == 2
    assert result.output_for_solid("add_two") == 3

    # solid_result_list returns List[SolidExecutionResult]
    # this checks to see that only two were executed
    assert {solid_result.solid.name for solid_result in result.solid_result_list} == {
        "add_one",
        "add_two",
    }
```

### Testing event stream

The event stream is the most generic way that a solid communicates what happened during its computation. Solids communicate events for starting, input/output type checking, and user-provided events such as expectations, materializations, and outputs.

```python file=/concepts/solids_pipelines/unit_tests.py startafter=start_test_event_stream endbefore=end_test_event_stream
def test_event_stream():
    pipeline_result = execute_pipeline(
        emit_events_pipeline, {"solids": {"emit_events_solid": {"inputs": {"input_num": 1}}}}
    )
    assert pipeline_result.success

    solid_result = pipeline_result.result_for_solid("emit_events_solid")

    assert isinstance(solid_result, SolidExecutionResult)

    # when one has multiple outputs, you need to specify output name
    assert solid_result.output_value(output_name="a_num") == 2

    assert [se.event_type for se in solid_result.step_events] == [
        DagsterEventType.STEP_START,
        DagsterEventType.STEP_INPUT,
        DagsterEventType.STEP_EXPECTATION_RESULT,
        DagsterEventType.ASSET_MATERIALIZATION,
        DagsterEventType.STEP_OUTPUT,
        DagsterEventType.HANDLED_OUTPUT,
        DagsterEventType.STEP_SUCCESS,
    ]

    # solids communicate what they did via the event stream, viewable in tools (e.g. dagit)
    (
        _start,
        _input_event,
        expectation_event,
        materialization_event,
        _num_output_event,
        _num_handled_output_operation,
        _success,
    ) = solid_result.step_events

    # apologies for verboseness here! we can do better.
    expectation_result = expectation_event.event_specific_data.expectation_result
    assert isinstance(expectation_result, ExpectationResult)
    assert expectation_result.success
    assert expectation_result.label == "positive"

    materialization = materialization_event.event_specific_data.materialization
    assert isinstance(materialization, AssetMaterialization)
    assert materialization.label == "persisted_string"
```
